package opmq

import (
	"container/list"
	"gitee.com/maomaomaoge/opmq/packets"
	"net"
	"strconv"
	"strings"
	"sync"
	"time"
)

type ClientsProperty struct {
	Zone             string `json:"zone"`       // 客户端配置组名称
	RecvCnt          int64  `json:"recv_cnt"`   // 接收的 TCP 报文数量
	MaxMqueue        int64  `json:"max_mqueue"` // 默认100
	Node             string `json:"node"`       // 客户端所连接的节点名称, 集群用,目前不做处理
	UserName         string `json:"username"`
	MqueueLen        int64  `json:"mqueue_len"` // 消息丢列当前长度
	MaxInflight      int64  `json:"max_inflight"`
	IsBridge         bool   `json:"is_bridge"`         // 指示客户端是否通过桥接方式连接
	MqueueDropped    int64  `json:"mqueue_dropped"`    // 消息队列因超出长度而丢弃的消息数量
	Inflight         int64  `json:"inflight"`          // 飞行队列当前长度
	HeapSize         int64  `json:"heap_size"`         // 进程堆栈大小，单位：字节
	MaxSubscriptions int64  `json:"max_subscriptions"` // 此客户端允许建立的最大订阅数量
	ProtoName        string `json:"proto_name"`        // MQTT
	CreatedAt        string `json:"created_at"`
	ProtoVer         int64  `json:"proto_ver"` // 客户端使用的协议版本
	Reductions       int64  `json:"reductions"`
	SendMsg          int64  `json:"send_msg"`     // 发送的 PUBLISH 报文数量
	IpAddress        string `json:"ip_address"`   // 客户端 IP 地址
	SendCnt          int64  `json:"send_cnt"`     // 发送的 TCP 报文数量
	MailboxLen       int64  `json:"mailbox_len"`  // 进程邮箱大小
	AwaitingRel      int64  `json:"awaiting_rel"` // 未确认的 PUBREC 报文数量
	Keepalive        int64  `json:"keepalive"`
	RecvMsg          int64  `json:"recv_msg"` // 接收的 PUBLISH 报文数量
	SendPkt          int64  `json:"send_pkt"` // 发送的 MQTT 报文数量
	RecvOct          int64  `json:"recv_oct"` // EMQ X Broker（下同）接收的字节数量
	ClientId         string `json:"clientid"`
	CleanStart       bool   `json:"clean_start"`
	ExpiryInterval   int    `json:"expiry_interval"`
	Connected        bool   `json:"connected"`
	Port             int64  `json:"port"`
	SendOct          int64  `json:"send_oct"` // 	发送的字节数量
	RecvPkt          int64  `json:"recv_pkt"` // 接收的 MQTT 报文数量
	ConnectedAt      string `json:"connected_at"`
	MaxAwaitingRel   int64  `json:"max_awaiting_rel"`  // 允许存在未确认的 PUBREC 报文的最大数量
	SubscriptionsCnt int64  `json:"subscriptions_cnt"` // 此客户端已建立的订阅数量
	DisconnectedAt   string `json:"disconnected_at"`   // 断线时间
}

// 客户端的连接属性
type clientConn struct {
	Srv *Server

	Cid  string
	Conn net.Conn

	job chan packets.ControlPacket

	done    chan struct{}
	IsAlive bool

	// 前段的http的连接属性，其中也有统计
	// 特码服了，本来cpu不够用，还尼玛统计
	ClientsProperty *ClientsProperty

	// 掉线数据暂存
	// ⚠️ WARNING: 这里的数据存储一定不要用管道，管道除了关闭链接时候，不知道什么时候关闭
	//
	// goto-retain: 请全局查找前面的单词，找到这个处理注意的分地方
	OnceRetain  int // 大于0时候就是代表已经发送过了
	muxRetain   sync.Mutex
	RetainQueue *list.List

	SubList []string // 订阅列表
}

func NewClientConn(cid string, conn net.Conn, srv *Server) *clientConn {
	c := &clientConn{
		Srv: srv,

		Cid:             cid,
		Conn:            conn,
		job:             make(chan packets.ControlPacket, 100000), //todo: 参数大小
		done:            make(chan struct{}, 0),
		IsAlive:         true,
		ClientsProperty: &ClientsProperty{},
		RetainQueue:     list.New(),
		SubList:         make([]string, 0),
	}

	t := time.Now()
	c.ClientsProperty.ConnectedAt = TimeUnixToString(t.Unix())
	c.ClientsProperty.Connected = true
	// todo: 连接赋值
	split := strings.Split(conn.RemoteAddr().String(), ":")
	if len(split) == 2 {
		cport, err := strconv.Atoi(split[1])
		if err == nil {
			c.ClientsProperty.Port = int64(cport)
		}
		c.ClientsProperty.IpAddress = split[0]
	}
	c.ClientsProperty.ProtoName = "MQTT"
	c.ClientsProperty.ProtoVer = 3
	c.ClientsProperty.MaxMqueue = 100000
	c.ClientsProperty.ClientId = cid

	go c.run()
	return c
}

// Submit 连接接受报文任务
// - 仅仅接受任务，在这一层可以很好的处理连接失活策略
func (c *clientConn) Submit(job packets.ControlPacket) {
	defer func() {
		err := recover()
		if err != nil {
			PANIC.Println(err)
			return
		}
	}()
	//
	//if !c.IsAlive {
	//	data, ok := job.(*packets.PublishPacket)
	//	if ok {
	//		if data.Retain {
	//			c.PushRetain(job)
	//		}
	//	}
	//
	//	return
	//}

	c.job <- job
}

// Disconnect 断开连接
func (cc *clientConn) Disconnect() {
	cc.IsAlive = false
	cc.done <- struct{}{}
	time.Sleep(time.Millisecond * 5)

	close(cc.job)
	cc.Conn.Close()
	cc.Srv.subs.unsubAll(cc)
	cc.ClientsProperty.DisconnectedAt = TimeUnixToString(time.Now().Unix())
	cc.ClientsProperty.Connected = false
}

// PushRetain 对于短线的数据发布，这里是存储仓库
func (cc *clientConn) PushRetain(data packets.ControlPacket) {
	cc.muxRetain.Lock()
	defer cc.muxRetain.Unlock()
	if cc.RetainQueue.Len() > 10000 {
		cc.RetainQueue.Remove(cc.RetainQueue.Front())
	}

	cc.RetainQueue.PushBack(data)
	DEBUG.Println(cc.Cid, "暂存的掉线数据", cc.RetainQueue.Len())
}

// SendRetain
func (cc *clientConn) SendRetain() {
	cc.muxRetain.Lock()
	defer func() {
		cc.muxRetain.Unlock()
		err := recover()
		if err != nil {
			PANIC.Println(err)
			cc.RetainQueue = list.New()
		}
	}()
	//DEBUG.Println(cc.Cid+"需要发送掉线的数据为： ", cc.RetainQueue.Len())
	for {
		if cc.RetainQueue.Len() == 0 || cc.IsAlive == false {
			return
		}

		item := cc.RetainQueue.Front()
		cc.job <- item.Value.(*packets.PublishPacket)
		cc.RetainQueue.Remove(item)
	}
}

func (c *clientConn) run() {
	go c.read()
	go c.write()
}

// write 连接属性真正给客户端发送报文的地方
func (cc *clientConn) write() {
	for job := range cc.job {
		err := job.Write(cc.Conn)
		if err != nil {
			//p, ok := job.(*packets.PublishPacket)
			//if ok {
			//	cc.PushRetain(p)
			//}
		}
		cc.ClientsProperty.SendMsg += 1 // 统计
	}
}

// read 接受来自客户端的报文，并且解析
func (cc *clientConn) read() {
	defer func() {
		err := recover()
		if err != nil {
			PANIC.Println(err)
			return
		}
		cc.Disconnect()
	}()

	for {
		packet, err := packets.ReadPacket(cc.Conn)
		if err != nil {
			//DEBUG.Println(cc.Cid + "断开连接")
			cc.Disconnect()
			return
		}

		switch packet.(type) {
		case *packets.ConnectPacket:
			// 维护属性信息
			cc.ClientsProperty.RecvPkt += 1

			cp := packet.(*packets.ConnectPacket)
			if cc.Srv.Auth.IsUsed {
				if cp.Username != cc.Srv.Auth.User {
					DEBUG.Println("用户名不匹配 收到的用户名和broker的比较", cp.Username, cc.Srv.Auth.User)
					cc.Conn.Close()
					return
				}
				if string(cp.Password) != cc.Srv.Auth.Pwd {
					DEBUG.Println("密码不匹配")
					cc.Conn.Close()
					return
				}
			}

			ack := packets.NewControlPacket(packets.Connack)
			go cc.Submit(ack)

			// 设置clientId
			if cp.ClientIdentifier != "" {
				cc.Cid = cp.ClientIdentifier
				cc.ClientsProperty.ClientId = cp.ClientIdentifier
				cc.ClientsProperty.Keepalive = int64(cp.Keepalive)
				cc.Srv.AddOrUpdateClientConn(cc) // 添加前端视图连接
			}

			if cp.Username != "" {
				cc.ClientsProperty.UserName = cp.Username
			}

		case *packets.SubscribePacket:
			cc.ClientsProperty.RecvPkt += 1
			// 订阅报文经过测试，只能发送Qos为1的报文
			sp := packet.(*packets.SubscribePacket)
			// 协议:
			// From: mqtt中文文档
			// SUBSCRIBE包从客户端发送到服务端创建一个或多个订阅。每个订阅注册客户端感兴趣的一个或多个话题。
			// 服务端向客户端发送PUBLISH包来分发匹配订阅的应用消息。
			// SUBSCRIBE包也（为每个订阅）指定服务端发送给客户端的应用消息的最大QoS。
			// Note: 目前只是支持 Qos 为 0 .
			ack := packets.NewControlPacket(packets.Suback).(*packets.SubackPacket)
			ack.Qos = 0 // always is 1
			ack.MessageID = sp.MessageID
			DEBUG.Println("msg ID", sp.MessageID)
			go cc.Submit(ack)

			// 添加topic
			for _, v := range sp.Topics {
				cc.Srv.subs.Add(v, cc)
			}

			// 如果是开启集群，需要进行进行一次数据同步, 目前只是支持从节点进行数据交换
			cc.Srv.ClusterSelf.NodeJoin(cc.Srv.ClusterSelf.ClusterNet.Ip, cc.Srv.MqttPort, cc.Srv.ClusterSelf.Id, sp.Topics)

			DEBUG.Println("收到订阅报文 cid: ", cc.Cid, " topic: ", sp.Topics)

		case *packets.PingreqPacket:
			cc.ClientsProperty.RecvPkt += 1
			//DEBUG.Println(cc.Cid + "ping 报文")
			ack := packets.NewControlPacket(packets.Pingresp)
			go cc.Submit(ack)

		case *packets.PublishPacket:
			cc.ClientsProperty.RecvPkt += 1
			cc.ClientsProperty.RecvMsg += 1

			pp := packet.(*packets.PublishPacket)

			//DEBUG.Println("发布报文")
			// todo: 假的qos为2
			if pp.Qos == 2 {
				ack1 := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
				ack1.MessageID = pp.MessageID
				ack1.Write(cc.Conn)

				ack2 := packets.NewControlPacket(packets.Pubrel).(*packets.PubrelPacket)
				ack2.MessageID = pp.MessageID
				ack2.Write(cc.Conn)

				ack3 := packets.NewControlPacket(packets.Pubcomp).(*packets.PubcompPacket)
				ack3.MessageID = pp.MessageID
				ack3.Write(cc.Conn)
			}

			ack0 := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
			ack0.MessageID = pp.MessageID
			go cc.Submit(ack0)

			cc.Srv.subs.Submit(pp)
			//DEBUG.Println("发布报文retain字段: ", packet.(*packets.PublishPacket).Retain)

		case *packets.DisconnectPacket:
			cc.ClientsProperty.RecvPkt += 1
			//DEBUG.Println("收到断开连接报文")
			return

		case *packets.UnsubscribePacket:
			cc.ClientsProperty.RecvPkt += 1
			cc.ClientsProperty.SendCnt += 1

			p := packet.(*packets.UnsubscribePacket)
			ack := packets.NewControlPacket(packets.Unsuback).(*packets.UnsubackPacket)
			ack.MessageID = p.MessageID
			ack.Write(cc.Conn)
			for _, v := range p.Topics {
				cc.Srv.subs.unsub(v, cc)
			}

			DEBUG.Println(cc.Cid+"取消订阅", " messageId: ", p.MessageID, "topic: ", p.Topics)

		default:
			// be useful for cpu
			time.Sleep(time.Millisecond)
		}
	}
}
